package org.zstacks.znet;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.zstacks.znet.callback.ConnectedCallback;
import org.zstacks.znet.callback.ErrorCallback;
import org.zstacks.znet.callback.MessageCallback;
import org.zstacks.znet.log.Logger;
import org.zstacks.znet.nio.Dispatcher;
import org.zstacks.znet.nio.Session;
import org.zstacks.znet.ticket.ResultCallback;
import org.zstacks.znet.ticket.Ticket;
import org.zstacks.znet.ticket.TicketManager;


public class RemotingClient extends MessageAdaptor implements Closeable {
	private static final Logger log = Logger.getLogger(RemotingClient.class);
	
	private final Dispatcher dispatcher;
	private Session session;
	protected final String brokerAddress;
	private String host = "127.0.0.1";
	private int port = 15555;
	
	private int readTimeout = 3000;
	private int connectTimeout = 3000;
	private int heartbeatInterval = 60000; //60s
	
	private ConcurrentMap<String, Object> attributes = null;
	private final ScheduledExecutorService heartbeator = Executors.newSingleThreadScheduledExecutor();
	
	private MessageCallback messageCallback;
	private ConnectedCallback connectedCallback;
	private ErrorCallback errorCallback; 
	private final TicketManager ticketManager = new TicketManager();
	
	public RemotingClient(String address, Dispatcher dispatcher){
		this.brokerAddress = address;
		this.dispatcher = dispatcher;
		String[] blocks = address.split("[:]");
		if(blocks.length > 2){
			throw new IllegalArgumentException("Illegal address: "+address);
		}
		String host = blocks[0].trim();
		int port = 15555;
		if(blocks.length > 1){
			port = Integer.valueOf(blocks[1].trim());
		}  
		initialize(host, port, dispatcher);
	}
	
	
	public RemotingClient(String host, int port, Dispatcher dispatcher){
		this.brokerAddress = String.format("%s:%d", host, port);
		this.dispatcher = dispatcher;
		initialize(host, port, dispatcher);
	}
	
	
	private void initialize(String host, int port, Dispatcher dispatcher){
		this.host = host;
		this.port = port; 
		if(!dispatcher.isStarted()){
			dispatcher.start();
		}
		this.heartbeator.scheduleAtFixedRate(new Runnable() {
			public void run() {
				if(RemotingClient.this.hasConnected()){
					Message hbt = new Message();
					hbt.setCommand(Message.HEARTBEAT);
					try {
						RemotingClient.this.send(hbt);
					} catch (IOException e) {  
						//ignore
					}
				}
			}
		}, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
	}
	
	
	protected void doConnect() throws IOException { 
    	if(this.session != null ){
    		if (this.session.isActive() || this.session.isNew()){  
    			return;
    		}
    	} 
    	this.session = dispatcher.registerClientChannel(host, port, this);
	}
	
	public void connect(int timeoutMillis) throws IOException{  
    	doConnect(); 
    	this.session.waitToConnect(timeoutMillis);
    }
    
    public boolean hasConnected(){
    	return session != null && session.isActive();
    }
  
    public void ensureConnected(){ 
		while(!this.hasConnected()){
			try {
				this.connect(connectTimeout);
			} catch (IOException e) {
				log.info(e.getMessage(), e);
			}
		}
	}  
    
    public void connectIfNeed() throws IOException{
    	if(!this.hasConnected()){
    		//同步进行连接操作
    		this.connect(this.connectTimeout);
    	}
    }  
	
	@Override
	public void close() throws IOException {
		if(this.session != null){
			this.session.close();
		}
		this.heartbeator.shutdown();
	} 
	

    public void invokeAsync(Message req, ResultCallback callback) throws IOException { 
    	connectIfNeed();
    	
		Ticket ticket = null;
		if(callback != null){
			ticket = ticketManager.createTicket(req, readTimeout, callback);
		} else {
			if("".equals(req.getMsgId()) || req.getMsgId() == null){//没有设置消息ID则自动生成
				req.setMsgId(Ticket.uuidTicket());
			}
		} 
		try{
			session.write(req);  
		} catch(IOException e) {
			if(ticket != null){
				ticketManager.removeTicket(ticket.getId());
			}
			throw e;
		}  
	} 
    public Message invokeSync(Message req) throws IOException, InterruptedException {
    	return this.invokeSync(req, this.readTimeout);
    }
    /**
     * 返回null标识超时
     * @param req
     * @param timeout
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public Message invokeSync(Message req, int timeout) throws IOException, InterruptedException {
    	Ticket ticket = null; 
		try {
			connectIfNeed();
			ticket = ticketManager.createTicket(req, timeout); 
			session.write(req);
			
			if(!ticket.await(timeout, TimeUnit.MILLISECONDS)){
				if(!session.isActive()){
					throw new IOException("Connection reset by peer");
				} else {
					return null;
				}
			}
		    return ticket.response(); 
		} finally{
			if(ticket != null){
				ticketManager.removeTicket(ticket.getId());
			}
		}   
    } 
    
	 /**
     * asynchronous send message, return message fall into client's callback 
     * 异步发送消息，消息没有Ticket匹配，由Client的消息回调处理
     * @param msg
     * @throws IOException
     */
    public void send(Message msg) throws IOException{
    	connectIfNeed();
    	//没有设置消息ID则自动生成
    	if("".equals(msg.getMsgId()) || msg.getMsgId() == null){
			msg.setMsgId(Ticket.uuidTicket());
		}
    	
    	this.session.write(msg);
    } 
    

	public int getReadTimeout() {
		return readTimeout;
	}

	public void setReadTimeout(int readTimeout) {
		this.readTimeout = readTimeout;
	}

	public int getConnectTimeout() {
		return connectTimeout;
	}

	public void setConnectTimeout(int connectTimeout) {
		this.connectTimeout = connectTimeout;
	}

	public Session getSession() {
		return session;
	}  
	
	public String getBrokerAddress(){
		return this.brokerAddress;
	}
	 
	@SuppressWarnings("unchecked")
	public <T> T attr(String key){
		if(this.attributes == null){
			return null;
		}
		
		return (T)this.attributes.get(key);
	}
	
	public <T> void attr(String key, T value){
		if(this.attributes == null){
			synchronized (this) {
				if(this.attributes == null){
					this.attributes = new ConcurrentHashMap<String, Object>();
				}
			} 
		}
		this.attributes.put(key, value);
	}
    
	
	
	@Override
    public void onMessage(Object obj, Session sess) throws IOException {  
    	Message msg = (Message)obj; 
    	//先验证是否有Ticket处理
    	Ticket ticket = ticketManager.removeTicket(msg.getMsgId());
    	if(ticket != null){
    		ticket.notifyResponse(msg); 
    		return;
    	}  
    	if(messageCallback != null){
    		this.messageCallback.onMessage(msg, sess);
    		return;
    	}
    	
    	log.warn("!!!!!!!!!!!!!!!!!!!!!!!!!!Drop,%s", msg);
	} 
	
	@Override
	public void onException(Throwable e, Session sess) throws IOException {
		if(e instanceof IOException && this.errorCallback != null){
			this.errorCallback.onError((IOException)e, sess);
		} else {
			super.onException(e, sess);
		}
	}
	
	@Override
	public void onSessionConnected(Session sess) throws IOException {
		super.onSessionConnected(sess);
		log.info("Connected: "+sess);
		if(this.connectedCallback != null){
			this.connectedCallback.onConnected(sess);
		}
	}
	
	public void setMessageCallback(MessageCallback messageCallback){
    	this.messageCallback = messageCallback;
    }
    
    public void setErrorCallback(ErrorCallback errorCallback){
    	this.errorCallback = errorCallback;
    } 
    
    public void setConnectedCallback(ConnectedCallback connectedCallback){
    	this.connectedCallback = connectedCallback;
    }

	public int getHeartbeatInterval() {
		return heartbeatInterval;
	}

	public void setHeartbeatInterval(int heartbeatInterval) {
		this.heartbeatInterval = heartbeatInterval;
	}
}
